/**
 * Copyright (C) @2014 Webank Group Holding Limited
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package cn.webank.framework.retry.biz.service.impl;

import java.util.List;
import java.util.Locale;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.support.ReloadableResourceBundleMessageSource;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

import cn.webank.framework.biz.service.support.WeBankServiceDispatcher;
import cn.webank.framework.dto.AsyncRMBMessage;
import cn.webank.framework.dto.BizError;
import cn.webank.framework.dto.BizErrors;
import cn.webank.framework.mapper.JsonMapper;
import cn.webank.framework.message.integration.RMBSao;
import cn.webank.framework.retry.dto.RetryMessageDTO;
import cn.webank.framework.retry.integration.RetryDAO;
import cn.webank.framework.retry.model.RetryMessage;
import redis.clients.jedis.ShardedJedis;

/**
 * 消息重发服务，基于Redis实现
 * 
 * @author jonyang
 *
 */
@Service("cn.webank.framework.retry.biz.service.RetryMessageService")
public class RetryMessagePojoService {
	private final static Logger LOG = LoggerFactory.getLogger(RetryMessagePojoService.class);

	@Autowired
	@Qualifier("retryTaskExecutor")
	private ThreadPoolTaskExecutor taskExecutor;

	/**
	 * 服务分发类
	 */
	@Autowired
	@Qualifier("cn.webank.framework.biz.service.support.WeBankServiceDispatcher")
	private WeBankServiceDispatcher serviceDispatcher;

	/**
	 * 国际化消息管理资源类
	 */
	@Autowired
	@Qualifier("messageSource")
	private ReloadableResourceBundleMessageSource bundleMessageSource;

	@Autowired
	@Qualifier("cn.webank.framework.message.integration.RMBSao")
	private RMBSao rmbSao;

	@Autowired
	@Qualifier("cn.webank.framework.retry.integration.dao.RetryDAO")
	private RetryDAO retryDao;

	@Value("${jedis.retry.namespace}")
	private String namespace;

	@Value("${jedis.retry.queue}")
	private String queueName;

	private JsonMapper jsonMapper = JsonMapper.nonDefaultMapper();

	// 每10分钟扫描一次
	// @Scheduled(cron = "0/10 * * * * ?")
	public void scan() {
		List<ShardedJedis> jedises = retryDao.getJedises();
		for (ShardedJedis jedis : jedises) {

			long size = retryDao.getRetryMessageSize(namespace, queueName, jedis);
			for (int i = 0; i < size; i++) {
				String msg = retryDao.getRetryMessage(namespace, queueName, jedis);
				if (msg == null)
					continue;

				RetryMessageDTO<?> retryDto = jsonMapper.fromJson(msg, RetryMessageDTO.class);
				RetryMessage retryMessage = new RetryMessage(retryDto);
				if (retryMessage.isExpired() || retryMessage.isOverThreshHold()) {
					// send to solace error queue
				} else if (retryMessage.isEnableExecute()) {
					executeMessage(namespace, queueName, taskExecutor, jsonMapper, serviceDispatcher, rmbSao,
							bundleMessageSource, retryDto, this);
				} else {
					retryDao.saveRetryMessage(namespace, queueName, retryDto.getKey(), jsonMapper.toJson(retryDto));
				}
			}

		}

	}

	private void executeMessage(String namespace, String queueName, ThreadPoolTaskExecutor taskExecutor,
			JsonMapper jsonMapper, WeBankServiceDispatcher serviceDispatcher, RMBSao rmbSao,
			ReloadableResourceBundleMessageSource bundleMessageSource, RetryMessageDTO<?> retryDto,
			RetryMessagePojoService retryService) {
		StringBuilder bizSeqNos = new StringBuilder();
		try {

			String msg = retryDto.getRetryObjectJson();
			final AsyncRMBMessage<?> asyncMessage = (AsyncRMBMessage<?>) jsonMapper.fromJson(msg,
					retryDto.getRetryObjectClass());

			bizSeqNos.append(asyncMessage.getBizSeqNo() + "\n");

			Runnable r = constructRunnable(namespace, queueName, asyncMessage, retryDto, serviceDispatcher, rmbSao,
					bundleMessageSource, retryDao, jsonMapper);

			// dispatch service
			taskExecutor.submit(r);

		} catch (Exception e) {
			LOG.error("system error,bizSeqNos:" + bizSeqNos.toString(), e);
		}
	}

	private static Runnable constructRunnable(final String namespace, final String queueName,
			final AsyncRMBMessage<?> asyncMessage, final RetryMessageDTO<?> retryDto,
			final WeBankServiceDispatcher serviceDispatcher, final RMBSao rmbSao,
			final ReloadableResourceBundleMessageSource bundleMessageSource, final RetryDAO retryDao,
			final JsonMapper jsonMapper) {

		Runnable r = new Runnable() {
			public void run() {
				// 4.1 dispatch service

				try {
					// 修改retry次数和最后一次执行时间
					long currentTime = System.currentTimeMillis();
					retryDto.addRetryTimes();
					retryDto.setLastExecutedTime(currentTime);

					BizErrors errors = new BizErrors();
					// TODO 优化，序列化2次了
					serviceDispatcher.dispatch(asyncMessage.getSysHeader().getOrganizationId(),
							asyncMessage.getServiceId(), asyncMessage.getScenario(),
							asyncMessage.getSysHeader().getVersion(), asyncMessage.getRmbMessage(), errors);

					// handle result
					if (errors.hasErrors()) {
						StringBuilder sb = new StringBuilder();
						boolean isFirst = true;
						for (BizError error : errors.getAllErrors()) {

							if (!isFirst) {
								sb.append("<br />");
							}
							sb.append(bundleMessageSource.getMessage(error.getCode(), error.getArguments(), "业务异常",
									Locale.CHINESE));
							isFirst = false;

						}

						LOG.error("biz error:" + sb.toString() + ",message:" + retryDto.getRetryObjectJson());

						// handleBizException(retryDto);
					} else {
						LOG.debug("retry message success. key:" + retryDto.getKey() + " ");
					}

				} catch (Exception e) {
					// 由业务服务记录错误报文
					LOG.error("distach service error,message:" + retryDto.getRetryObjectJson(), e);
					handleSysException(namespace, queueName, retryDto, e, retryDao, jsonMapper);

				}

			}
		};

		return r;

	}

	private static void handleSysException(String namespace, String queueName, RetryMessageDTO<?> retryDto, Exception e,
			RetryDAO retryDao, JsonMapper jsonMapper) {

		retryDao.saveRetryMessage(namespace, queueName, retryDto.getKey(), jsonMapper.toJson(retryDto));
	}

}
